查看原文
其他

SpringBoot RocketMQ 整合使用和监控

The following article is from zhisheng Author zhisheng

点击蓝色SpringForAll社区关注我哟


前提

通过前面两篇文章可以简单的了解 RocketMQ 和 安装 RocketMQ ,今天就将 SpringBoot 和 RocketMQ 整合起来使用。

创建项目

在 IDEA 创建一个 SpringBoot 项目,项目结构如下:

pom 文件

引入 RocketMQ 的一些相关依赖,最后的 pom 文件如下:

  1. <?xml version="1.0" encoding="UTF-8"?>

  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  3.    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  4.    <modelVersion>4.0.0</modelVersion>


  5.    <groupId>com.zhisheng</groupId>

  6.    <artifactId>rocketmq</artifactId>

  7.    <version>0.0.1-SNAPSHOT</version>

  8.    <packaging>jar</packaging>


  9.    <name>rocketmq</name>

  10.    <description>Demo project for Spring Boot RocketMQ</description>


  11.    <parent>

  12.        <groupId>org.springframework.boot</groupId>

  13.        <artifactId>spring-boot-starter-parent</artifactId>

  14.        <version>1.5.9.RELEASE</version>

  15.        <relativePath/> <!-- lookup parent from repository -->

  16.    </parent>


  17.    <properties>

  18.        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  19.        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

  20.        <java.version>1.8</java.version>

  21.    </properties>


  22.    <dependencies>

  23.        <dependency>

  24.            <groupId>org.springframework.boot</groupId>

  25.            <artifactId>spring-boot-starter-web</artifactId>

  26.        </dependency>


  27.        <dependency>

  28.            <groupId>org.springframework.boot</groupId>

  29.            <artifactId>spring-boot-starter-test</artifactId>

  30.            <scope>test</scope>

  31.        </dependency>


  32.        <dependency>

  33.            <groupId>org.apache.rocketmq</groupId>

  34.            <artifactId>rocketmq-common</artifactId>

  35.            <version>4.2.0</version>

  36.        </dependency>


  37.        <dependency>

  38.            <groupId>org.apache.rocketmq</groupId>

  39.            <artifactId>rocketmq-client</artifactId>

  40.            <version>4.2.0</version>

  41.        </dependency>

  42.    </dependencies>


  43.    <build>

  44.        <plugins>

  45.            <plugin>

  46.                <groupId>org.springframework.boot</groupId>

  47.                <artifactId>spring-boot-maven-plugin</artifactId>

  48.            </plugin>

  49.        </plugins>

  50.    </build>

  51. </project>

配置文件

application.properties 中如下:

  1. # 消费者的组名

  2. apache.rocketmq.consumer.PushConsumer=PushConsumer

  3. # 生产者的组名

  4. apache.rocketmq.producer.producerGroup=Producer

  5. # NameServer地址

  6. apache.rocketmq.namesrvAddr=localhost:9876

生产者

  1. package com.zhisheng.rocketmq.client;


  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;

  3. import org.apache.rocketmq.common.message.Message;

  4. import org.apache.rocketmq.remoting.common.RemotingHelper;

  5. import org.springframework.beans.factory.annotation.Value;

  6. import org.springframework.stereotype.Component;

  7. import org.springframework.util.StopWatch;


  8. import javax.annotation.PostConstruct;


  9. /**

  10. * Created by zhisheng_tian on 2018/2/6

  11. */

  12. @Component

  13. public class RocketMQClient {

  14.    /**

  15.     * 生产者的组名

  16.     */

  17.    @Value("${apache.rocketmq.producer.producerGroup}")

  18.    private String producerGroup;


  19.    /**

  20.     * NameServer 地址

  21.     */

  22.    @Value("${apache.rocketmq.namesrvAddr}")

  23.    private String namesrvAddr;


  24.    @PostConstruct

  25.    public void defaultMQProducer() {

  26.        //生产者的组名

  27.        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

  28.        //指定NameServer地址,多个地址以 ; 隔开

  29.        producer.setNamesrvAddr(namesrvAddr);


  30.        try {

  31.            /**

  32.             * Producer对象在使用之前必须要调用start初始化,初始化一次即可

  33.             * 注意:切记不可以在每次发送消息时,都调用start方法

  34.             */

  35.            producer.start();


  36.               //创建一个消息实例,包含 topic、tag 和 消息体

  37.             //如下:topic 为 "TopicTest",tag 为 "push"

  38.            Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));


  39.            StopWatch stop = new StopWatch();

  40.            stop.start();


  41.            for (int i = 0; i < 10000; i++) {

  42.                SendResult result = producer.send(message);

  43.                System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());

  44.            }

  45.            stop.stop();

  46.            System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis());

  47.        } catch (Exception e) {

  48.            e.printStackTrace();

  49.        } finally {

  50.            producer.shutdown();

  51.        }

  52.    }

  53. }

消费者

  1. package com.zhisheng.rocketmq.server;


  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

  5. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

  6. import org.apache.rocketmq.common.message.MessageExt;

  7. import org.apache.rocketmq.remoting.common.RemotingHelper;

  8. import org.springframework.beans.factory.annotation.Value;

  9. import org.springframework.stereotype.Component;


  10. import javax.annotation.PostConstruct;


  11. /**

  12. * Created by zhisheng_tian on 2018/2/6

  13. */

  14. @Component

  15. public class RocketMQServer {

  16.    /**

  17.     * 消费者的组名

  18.     */

  19.    @Value("${apache.rocketmq.consumer.PushConsumer}")

  20.    private String consumerGroup;


  21.    /**

  22.     * NameServer 地址

  23.     */

  24.    @Value("${apache.rocketmq.namesrvAddr}")

  25.    private String namesrvAddr;


  26.    @PostConstruct

  27.    public void defaultMQPushConsumer() {

  28.        //消费者的组名

  29.        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);


  30.        //指定NameServer地址,多个地址以 ; 隔开

  31.        consumer.setNamesrvAddr(namesrvAddr);

  32.        try {

  33.            //订阅PushTopic下Tag为push的消息

  34.            consumer.subscribe("TopicTest", "push");


  35.            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费

  36.            //如果非第一次启动,那么按照上次消费的位置继续消费

  37.            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

  38.            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {

  39.                try {

  40.                    for (MessageExt messageExt : list) {


  41.                        System.out.println("messageExt: " + messageExt);//输出消息内容


  42.                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);


  43.                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容

  44.                    }

  45.                } catch (Exception e) {

  46.                    e.printStackTrace();

  47.                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试

  48.                }

  49.                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功

  50.            });

  51.            consumer.start();

  52.        } catch (Exception e) {

  53.            e.printStackTrace();

  54.        }

  55.    }

  56. }

启动类

  1. package com.zhisheng.rocketmq;


  2. import org.springframework.boot.SpringApplication;

  3. import org.springframework.boot.autoconfigure.SpringBootApplication;


  4. @SpringBootApplication

  5. public class RocketmqApplication {


  6.    public static void main(String[] args) {

  7.        SpringApplication.run(RocketmqApplication.class, args);

  8.    }

  9. }

RocketMQ

代码已经都写好了,接下来我们需要将与 RocketMQ 有关的启动起来。

启动 Name Server

在前面文章中已经写过怎么启动,http://www.54tianzhisheng.cn/2018/02/06/RocketMQ-install/#%E5%90%AF%E5%8A%A8-NameServer

进入到目录 :

  1. cd distribution/target/apache-rocketmq

启动:

  1. nohup sh bin/mqnamesrv &


  2. tail -f ~/logs/rocketmqlogs/namesrv.log //通过日志查看是否启动成功

启动 Broker

  1. nohup sh bin/mqbroker -n localhost:9876 &


  2. tail -f ~/logs/rocketmqlogs/broker.log    //通过日志查看是否启动成功

然后运行启动类,运行效果如下:

监控

RocketMQ有一个对其扩展的开源项目 ocketmq-console ,如今也提交给了 Apache ,地址在:

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

官方也给出了其支持的功能的中文文档:https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/100/UserGuide_CN.md , 

那么该如何安装?

Docker 安装

1、获取 Docker 镜像

  1. docker pull styletang/rocketmq-console-ng        

2、运行,注意将你自己的 NameServer 地址替换下面的 127.0.0.1

  1. docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

非 Docker 安装

我们 git clone 一份代码到本地:

  1. git clone https://github.com/apache/rocketmq-externals.git


  2. cd rocketmq-externals/rocketmq-console/

需要 jdk 1.7 以上。执行以下命令:

  1. mvn spring-boot:run

或者

  1. mvn clean package -Dmaven.test.skip=true


  2. java -jar target/rocketmq-console-ng-1.0.0.jar

注意:

1、如果你下载依赖缓慢,你可以重新设置 maven 的 mirror 为阿里云的镜像

  1. <mirrors>

  2.    <mirror>

  3.          <id>alimaven</id>

  4.          <name>aliyun maven</name>

  5.          <url>http://maven.aliyun.com/nexus/content/groups/public/</url>

  6.          <mirrorOf>central</mirrorOf>        

  7.    </mirror>

  8. </mirrors>

2、如果你使用的 RocketMQ 版本小于 3.5.8,如果您使用 rocketmq < 3.5.8,请在启动 rocketmq-console-ng 时添加 -Dcom.rocketmq.sendMessageWithVIPChannel=false(或者您可以在 ops 页面中更改它)

3、更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在ops页面中更改它)

错误解决方法

1、Docker 启动项目报错

org.apache.rocketmq.remoting.exception.RemotingConnectException:connect to<null>failed

将 Docker 启动命令改成如下以后:

  1. docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

报错信息改变了,新的报错信息如下:

  1. ERROR op=global_exception_handler_print_error


  2. org.apache.rocketmq.console.exception.ServiceException: This date have't data!

看到网上有人也遇到这个问题,他们都通过自己的方式解决了,但是方法我都试了,不适合我。不得不说,阿里,你能再用心点吗?既然把 RocketMQ 捐给 Apache 了,这些文档啥的都必须更新啊,不要还滞后着呢,不然少不了被吐槽!

搞了很久这种方法没成功,暂时放弃!mmp

2、非 Docker 安装,只好把源码编译打包了。

1) 注意需要修改如下图中的配置:

  1. rocketmq.config.namesrvAddr=localhost:9876        //注意替换你自己的ip


  2. #如果你 rocketmq 版本小于 3.5.8 才需设置 `rocketmq.config.isVIPChannel` 为 false,默认是 true, 这个可以在源码中可以看到的

  3. rocketmq.config.isVIPChannel=

2) 执行以下命令:

  1. mvn clean package -Dmaven.test.skip=true

编译成功:

可以看到已经打好了 jar 包:

运行:

  1. java -jar rocketmq-console-ng-1.0.0.jar

成功,不报错了,开心😄,访问 http://localhost:8080/

整个监控大概就是这些了。

然后我运行之前的 SpringBoot 整合项目,查看监控信息如下:

总结

整篇文章讲述了 SpringBoot 与 RocketMQ 整合和 RocketMQ 监控平台的搭建。

参考文章

1.http://www.ymq.io/2018/02/02/spring-boot-rocketmq-example/#%E6%96%B0%E5%8A%A0%E9%A1%B9%E7%9B%AE

2.GitHub 官方 README


推荐阅读

点击原文阅读更多


点个“好看”支持一下鸭

点鸭点鸭点鸭

                                                                                                       ↓↓↓



    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存